{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Online serving for DLinear model using Ray Serve\n",
    "\n",
    "\n",
    "<div align=\"left\">\n",
    "<a target=\"_blank\" href=\"https://console.anyscale.com/\"><img src=\"https://img.shields.io/badge/🚀 Run_on-Anyscale-9hf\"></a>&nbsp;\n",
    "<a href=\"https://github.com/anyscale/e2e-timeseries\" role=\"button\"><img src=\"https://img.shields.io/static/v1?label=&amp;message=View%20On%20GitHub&amp;color=586069&amp;logo=github&amp;labelColor=2f363d\"></a>&nbsp;\n",
    "</div>\n",
    "\n",
    "This tutorial launches an online service that:\n",
    "- deploys trained DLinear model artifacts to generate time series predictions\n",
    "- autoscales based on real-time incoming traffic\n",
    "- covers observability and debugging around the service\n",
    "\n",
    "Note that this notebook requires that you run the [Distributed training of a DLinear model](./01-Distributed-Training.ipynb) tutorial to generate the pre-trained model artifacts that this tutorial fetches.\n",
    "\n",
    "\n",
    "[Ray Serve](https://docs.ray.io/en/latest/serve/index.html) is a highly scalable and flexible model serving library for building online inference APIs. You can:\n",
    "\n",
    "- Wrap models and business logic as separate [serve deployments](https://docs.ray.io/en/latest/serve/key-concepts.html#deployment) and [connect](https://docs.ray.io/en/latest/serve/model_composition.html) them together (pipeline, ensemble, etc.)\n",
    "- Avoid one large service that's network and compute bounded and an inefficient use of resources\n",
    "- Utilize fractional heterogeneous [resources](https://docs.ray.io/en/latest/serve/resource-allocation.html), which **isn't possible** with SageMaker, Vertex, KServe, etc., and horizontally scale, with `num_replicas`\n",
    "- [Autoscale](https://docs.ray.io/en/latest/serve/autoscaling-guide.html) up and down based on traffic\n",
    "- Integrate with [FastAPI and HTTP](https://docs.ray.io/en/latest/serve/http-guide.html)\n",
    "- Set up a [gRPC service](https://docs.ray.io/en/latest/serve/advanced-guides/grpc-guide.html#set-up-a-grpc-service) to build distributed systems and microservices\n",
    "- Enable [dynamic batching](https://docs.ray.io/en/latest/serve/advanced-guides/dyn-req-batch.html) based on batch size, time, etc.\n",
    "- Access a suite of [utilities for serving LLMs](https://docs.ray.io/en/latest/serve/llm/serving-llms.html) that are inference-engine agnostic and have batteries-included support for LLM-specific features such as multi-LoRA support\n",
    "\n",
    "<img src=\"https://github.com/anyscale/e2e-timeseries/blob/main/images/ray_serve.png?raw=true\" width=600>\n",
    "\n",
    "## Set up the environment\n",
    "\n",
    "First, import the necessary modules and set up the environment for Ray Serve deployment:\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import asyncio\n",
    "import os\n",
    "\n",
    "import aiohttp\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "import requests\n",
    "import torch\n",
    "from fastapi import FastAPI"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Remove this setting when it becomes the default in a future release.\n",
    "os.environ[\"RAY_TRAIN_V2_ENABLED\"] = \"1\"\n",
    "\n",
    "# Now it's safe to import from Ray.\n",
    "import ray\n",
    "from ray import serve\n",
    "from starlette.requests import Request"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Initialize the Ray cluster with the `e2e_timeseries` module, so that newly spawned workers can import from it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import e2e_timeseries\n",
    "from e2e_timeseries.model import DLinear\n",
    "\n",
    "ray.init(runtime_env={\"py_modules\": [e2e_timeseries]})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a Ray Serve deployment\n",
    "\n",
    "Next, define the Ray Serve endpoint for the DLinear model. This implementation uses a reusable class to avoid reloading the model for each request. The deployment supports both Pythonic and HTTP requests with dynamic batching for efficient inference."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "DEPLOYMENT_NAME = \"dlinear-ett-server\"\n",
    "\n",
    "# Create a FastAPI app that adds endpoints to the Serve deployment.\n",
    "app = FastAPI(title=\"DLinear\", description=\"predict future oil temperatures\", version=\"0.1\")\n",
    "\n",
    "\n",
    "@serve.deployment(num_replicas=1, ray_actor_options={\"num_cpus\": 1, \"num_gpus\": 1})\n",
    "@serve.ingress(app)\n",
    "class DLinearModelServe:\n",
    "    def __init__(self, model_checkpoint_path: str | None = None):\n",
    "        checkpoint = torch.load(model_checkpoint_path, map_location=torch.device(\"cpu\"))  # Load to CPU first\n",
    "        self.args = checkpoint[\"train_args\"]\n",
    "        self.device = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n",
    "        print(f\"Using device: {self.device}\")\n",
    "\n",
    "        # Load model from checkpoint.\n",
    "        self.model = DLinear(self.args).float()\n",
    "        self.model.load_state_dict(checkpoint[\"model_state_dict\"])\n",
    "        print(f\"Model loaded successfully from {model_checkpoint_path}\")\n",
    "\n",
    "        self.model.to(self.device)\n",
    "        self.model.eval()\n",
    "\n",
    "    @serve.batch(max_batch_size=32, batch_wait_timeout_s=0.1)\n",
    "    async def predict_batch(self, batch_x: list[list[float]]) -> list[list[float]]:\n",
    "        \"\"\"\n",
    "        Expects a list of series, where each series is a 1D list of floats/integers.\n",
    "        e.g., [[0.1, 0.2, ..., 0.N], [0.3, 0.4, ..., 0.M]]\n",
    "        Each series is a 1D list of floats/integers.\n",
    "        \"\"\"\n",
    "\n",
    "        # Convert list of 1D series to a 2D numpy array (batch_size, seq_len).\n",
    "        batch_x = np.array(batch_x, dtype=np.float32)\n",
    "        batch_x = torch.from_numpy(batch_x).float().to(self.device)\n",
    "\n",
    "        # Ensure batch_x is 3D: (batch_size, seq_len, num_features)\n",
    "        # For univariate 'S' models, num_features is 1.\n",
    "        if batch_x.ndim == 2:\n",
    "            batch_x = batch_x.unsqueeze(-1)\n",
    "\n",
    "        with torch.no_grad():\n",
    "            outputs = self.model(batch_x)\n",
    "            # Output shape: (batch_size, pred_len, features_out)\n",
    "\n",
    "        # Slice to get the prediction length part of the output.\n",
    "        # The [:, :, :] part takes all output features.\n",
    "        # For 'S' (single-feature) forecasting, DLinear typically outputs 1 feature.\n",
    "        # For 'M' (multi-feature) forecasting, DLinear typically outputs multiple features.\n",
    "        outputs = outputs[:, -self.args[\"pred_len\"] :, :]\n",
    "\n",
    "        # If 'S' (single feature forecasting) and the model's output for that single\n",
    "        # feature has an explicit last dimension of 1, squeeze it.\n",
    "        # This approach makes the output a list of 1D series (list of lists of floats).\n",
    "        if outputs.shape[-1] == 1:\n",
    "            outputs = outputs.squeeze(-1)  # Shape: (batch_size, pred_len)\n",
    "\n",
    "        outputs_list = outputs.cpu().numpy().tolist()\n",
    "        return outputs_list\n",
    "\n",
    "    @app.post(\"/predict\")\n",
    "    async def predict_endpoint(self, request: Request):\n",
    "        \"\"\"\n",
    "        Expects a JSON body, which is a list of floats/integers.\n",
    "        e.g., [0.1, 0.2, ..., 0.N]\n",
    "        where N must be equal to self.args.seq_len.\n",
    "        \"\"\"\n",
    "        try:\n",
    "            input_data = await request.json()\n",
    "            if not isinstance(input_data, list):\n",
    "                return {\"error\": \"Invalid input. JSON list of numbers expected.\"}\n",
    "            if len(input_data) != self.args[\"seq_len\"]:\n",
    "                return {\"error\": f\"Invalid series length. Expected {self.args['seq_len']}, got {len(input_data)}.\"}\n",
    "\n",
    "        except Exception as e:\n",
    "            return {\"error\": f\"Failed to parse JSON request: {str(e)}\"}\n",
    "\n",
    "        # Pass the single list input_data, wrapped in another list, to predict_batch.\n",
    "        # Ray Serve's @serve.batch handles collecting these into a batch for predict_batch.\n",
    "        # The await call returns the specific result for this input_data.\n",
    "        single_prediction_output = await self.predict_batch(input_data)\n",
    "\n",
    "        # single_prediction_output is expected to be a list[float] (the prediction for one series)\n",
    "        return single_prediction_output\n",
    "\n",
    "    # Expose get_seq_len as a GET endpoint.\n",
    "    @app.get(\"/seq_len\")\n",
    "    async def get_sequence_length(self):\n",
    "        return {\"seq_len\": self.args[\"seq_len\"]}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert\"> <b>Model composition</b>\n",
    "\n",
    "Ray Serve makes it easy to do [model composition](https://docs.ray.io/en/latest/serve/model_composition.html) where you can compose multiple deployments containing ML models or business logic into a single application. You can independently scale fractional resources and configure each of the deployments.\n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/anyscale/foundational-ray-app/refs/heads/main/images/serve_composition.png\" width=800>\n",
    "\n",
    "</div>\n",
    "\n",
    "## Load the model and start the service\n",
    "\n",
    "Load the trained DLinear model and start the Ray Serve deployment. The model checkpoint path loads from the metadata file created during training:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Load the best checkpoint path from the metadata file created in the training notebook.\n",
    "best_checkpoint_metadata_fpath = \"/mnt/cluster_storage/checkpoints/best_checkpoint_path.txt\"\n",
    "with open(best_checkpoint_metadata_fpath, \"r\") as f:\n",
    "    best_checkpoint_path = f.read().strip()\n",
    "\n",
    "\n",
    "def serve_model(best_checkpoint_path):\n",
    "    dlinear_app = DLinearModelServe.bind(model_checkpoint_path=best_checkpoint_path)\n",
    "\n",
    "    # The route_prefix applies to all routes within the FastAPI app.\n",
    "    serve.run(dlinear_app, name=DEPLOYMENT_NAME, route_prefix=\"/predict_dlinear\")\n",
    "    print(f\"DLinear model deployment '{DEPLOYMENT_NAME}' is running with FastAPI app.\")\n",
    "    print(\"  Prediction endpoint: http://127.0.0.1:8000/predict_dlinear/predict\")\n",
    "    print(\"  Sequence length endpoint: http://127.0.0.1:8000/predict_dlinear/seq_len\")\n",
    "\n",
    "    print(\"\\nTo stop the server, press Ctrl+C in the terminal where it's running.\")\n",
    "\n",
    "\n",
    "serve_model(best_checkpoint_path)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You should see logs indicating that the service is running locally:\n",
    "\n",
    "```bash\n",
    "INFO 2025-04-09 14:06:55,760 serve 31684 -- Started Serve in namespace \"serve\".\n",
    "INFO 2025-04-09 14:06:57,875 serve 31684 -- Application 'dlinear-ett-server' is ready at http://127.0.0.1:8000/.\n",
    "```\n",
    "\n",
    "## Test the service\n",
    "\n",
    "Test the deployed DLinear model with both single requests and concurrent batch requests to demonstrate the dynamic batching capabilities:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "async def test_serve():\n",
    "    # --- Example Client Code, which can be run in a separate script or after serve starts ---\n",
    "\n",
    "    # Base URL for the service.\n",
    "    base_url = \"http://127.0.0.1:8000/predict_dlinear\"\n",
    "    seq_len_url = f\"{base_url}/seq_len\"\n",
    "    predict_url = f\"{base_url}/predict\"\n",
    "\n",
    "    # Get the proper seq_len for the deployed model.\n",
    "    response = requests.get(seq_len_url)\n",
    "    response.raise_for_status()\n",
    "    seq_len_data = response.json()\n",
    "    seq_len = seq_len_data.get(\"seq_len\")\n",
    "\n",
    "    # Load sample data for demonstration purposes.\n",
    "    df = pd.read_csv(\"s3://air-example-data/electricity-transformer/ETTh2.csv\")\n",
    "    ot_series = df[\"OT\"].tolist()\n",
    "\n",
    "    # Create a single sample request from the loaded data.\n",
    "    sample_input_series = ot_series[:seq_len]\n",
    "    sample_request_body = sample_input_series\n",
    "\n",
    "    print(\"\\n--- Sending Single Synchronous Request to /predict endpoint ---\")\n",
    "    response = requests.post(predict_url, json=sample_request_body)\n",
    "    response.raise_for_status()\n",
    "    prediction = response.json()\n",
    "    print(f\"Prediction (first 5 values): {prediction[:5]}\")\n",
    "\n",
    "    print(\"\\n--- Sending Batch Asynchronous Requests to /predict endpoint ---\")\n",
    "    sample_input_list = [sample_input_series] * 100  # Use identical requests\n",
    "\n",
    "    async def fetch(session, url, data):\n",
    "        async with session.post(url, json=data) as response:\n",
    "            response.raise_for_status()\n",
    "            return await response.json()\n",
    "\n",
    "    async def fetch_all_concurrently(requests_to_send: list):\n",
    "        async with aiohttp.ClientSession() as session:\n",
    "            tasks = [fetch(session, predict_url, input_data) for input_data in requests_to_send]\n",
    "            responses = await asyncio.gather(*tasks, return_exceptions=True)\n",
    "            return responses\n",
    "\n",
    "    predictions = await fetch_all_concurrently(sample_input_list)\n",
    "    print(f\"Finished predictions for {len(sample_input_list)} inputs\")\n",
    "\n",
    "\n",
    "# Running this code in a notebook creates an asyncio event loop in the global scope.\n",
    "# So, use await directly.\n",
    "await test_serve()\n",
    "# Use `asyncio.run(test_serve())` instead if running the code in a script.\n",
    "    "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Ray Serve's dynamic batching automatically chunks incoming requests to maximize throughput and hardware utilization while maintaining low latency.\n",
    "\n",
    "<div class=\"alert alert-block alert\"> <b>Observability for services</b>\n",
    "\n",
    "The Ray dashboard automatically captures observability for Ray Serve applications in the [Serve view](https://docs.ray.io/en/latest/ray-observability/getting-started.html#serve-view). You can view the service [deployments and their replicas](https://docs.ray.io/en/latest/serve/key-concepts.html#serve-key-concepts-deployment) and time-series metrics about the service's health.\n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/anyscale/e2e-timeseries/refs/heads/main/images/serve_dashboard.png\" width=800>\n",
    "\n",
    "</div>\n",
    "\n",
    "## Production deployment considerations\n",
    "\n",
    "<div class=\"alert alert-block alert\"> <b>Anyscale Services</b>\n",
    "\n",
    "[Anyscale Services](https://docs.anyscale.com/platform/services/) offers a fault tolerant, scalable, and optimized way to serve Ray Serve applications. See the [API ref](https://docs.anyscale.com/reference/service-api/) for more details. You can:\n",
    "- [rollout and update](https://docs.anyscale.com/platform/services/update-a-service) services with canary deployment and zero-downtime upgrades.\n",
    "- [monitor](https://docs.anyscale.com/platform/services/monitoring) services through a dedicated service page, unified log viewer, tracing, set up alerts, etc.\n",
    "- scale a service with `num_replicas=auto` and utilize replica compaction to consolidate nodes that are fractionally utilized.\n",
    "- have [head node fault tolerance](https://docs.anyscale.com/platform/services/production-best-practices#head-node-ft). OSS Ray recovers from failed workers and replicas but not head node crashes.\n",
    "- serving [multiple applications](https://docs.anyscale.com/platform/services/multi-app) in a single service.\n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/anyscale/e2e-timeseries/refs/heads/main/images/canary.png\" width=1000>\n",
    "\n",
    "[RayTurbo Serve](https://docs.anyscale.com/rayturbo/rayturbo-serve) on Anyscale has more capabilities on top of Ray Serve:\n",
    "- **fast autoscaling and model loading** to get services up and running even faster with [5x improvements](https://www.anyscale.com/blog/autoscale-large-ai-models-faster) even for LLMs\n",
    "- 54% **higher QPS** and up-to 3x **streaming tokens per second** for high traffic serving use-cases with no proxy bottlenecks\n",
    "- **replica compaction** into fewer nodes where possible to reduce resource fragmentation and improve hardware utilization\n",
    "- **zero-downtime** [incremental rollouts](https://docs.anyscale.com/platform/services/update-a-service/#resource-constrained-updates) so the service is never interrupted\n",
    "- [**different environments**](https://docs.anyscale.com/platform/services/multi-app/#multiple-applications-in-different-containers) for each service in a multi-serve application\n",
    "- **multi availability-zone** aware scheduling of Ray Serve replicas to provide higher redundancy to availability zone failures\n",
    "\n",
    "</div>\n",
    "\n",
    "### Deploying to production\n",
    "\n",
    "For production deployment on Anyscale, you can use the following command:\n",
    "\n",
    "```bash\n",
    "# Production online service.\n",
    "anyscale service deploy e2e_timeseries.serve:dlinear_model --name=dlinear-ett-forecaster \\\n",
    "  --containerfile=\"${WORKING_DIR}/containerfile\" \\\n",
    "  --working-dir=\"${WORKING_DIR}\" \\\n",
    "  --exclude=\"\"\n",
    "```\n",
    "\n",
    "**Note**: \n",
    "- This example uses a `containerfile` to define dependencies, but you could easily use a pre-built image as well.\n",
    "- You can specify the compute as a [compute config](https://docs.anyscale.com/configuration/compute-configuration/) or inline in a [Service config](https://docs.anyscale.com/reference/service-api/) file.\n",
    "- When you don't specify compute and you launch from a workspace, the default is the compute configuration of the workspace.\n",
    "\n",
    "After the service is running remotely, you need to use the bearer token to query it. You can modify the requests code to use this token:\n",
    "\n",
    "```python\n",
    "# Service specific config. Replace with your own values from the deployment logs.\n",
    "base_url = \"https://dlinear-ett-forecaster-jgz99.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com\"\n",
    "token = \"tXhmYYY7qMbrb1ToO9_J3n5_kD7ym7Nirs8djtip7P0\"\n",
    "\n",
    "# Requests config.\n",
    "path = \"/predict_dlinear/predict\"\n",
    "full_url = f\"{base_url}{path}\"\n",
    "headers = {\"Authorization\": f\"Bearer {token}\"}\n",
    "\n",
    "prediction = requests.post(full_url, json=sample_input_series, headers=headers).json()\n",
    "```\n",
    "\n",
    "Don't forget to stop the service once it's no longer needed:\n",
    "\n",
    "```bash\n",
    "anyscale service terminate --name dlinear-ett-forecaster\n",
    "```\n",
    "\n",
    "<div class=\"alert alert-block alert\"> <b>CI/CD</b>\n",
    "\n",
    "While Anyscale [Jobs](https://docs.anyscale.com/platform/jobs/) and [Services](https://docs.anyscale.com/platform/services/) are useful atomic concepts that help you productionize workloads, they're also convenient for nodes in a larger ML DAG or [CI/CD workflow](https://docs.anyscale.com/ci-cd/). You can chain Jobs together, store results, and then serve the application with those artifacts. From there, you can trigger updates to the service and retrigger the Jobs based on events, time, etc. While you can use the Anyscale CLI to integrate with any orchestration platform, Anyscale does support some purpose-built integrations like [Airflow](https://docs.anyscale.com/ci-cd/apache-airflow/) and [Prefect](https://github.com/anyscale/prefect-anyscale). \n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/anyscale/e2e-timeseries/refs/heads/main/images/cicd.png\" width=700>\n",
    "\n",
    "</div>\n"
   ]
  }
 ],
 "metadata": {
  "language_info": {
   "name": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
